package com.at.bigdata.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 *
 * @author cdhuangchao3
 * @date 2023/5/29 9:24 PM
 */
object SparkStreaming06_State_Join {

  def main(args: Array[String]): Unit = {
    // 创建环境
    // 创建时，需要传递2个参数：
    //    param： 环境配置
    val sc = new SparkConf().setMaster("local[*]").setAppName("operator")
    //    param2: 采集周期
    val ssc = new StreamingContext(sc, Seconds(3))

    val data9999 = ssc.socketTextStream("localhost", 9999)
    val data8888 = ssc.socketTextStream("localhost", 8888)

    val map9999 = data9999.map((_, 1))
    val map8888 = data8888.map((_, 1))
    // 所谓的DStream的Join操作，其实就2个RDD的Join
    val joinDS = map9999.join(map8888)
    joinDS.print()

    // 1、启动采集器
    ssc.start()
    // 2、等待采集器的关闭
    ssc.awaitTermination()
  }

}
